package ds_industry_2025.ds.YangJuan_2024.T1

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}

/*
    抽取 shtd_store 库中 base_region 的增量数据进入 Hive 的 ods 库中表 base_region。根据
ods.base_region 表中 id 作为增量字段，只将新增的数据抽入，字段名称、类型不变并
添加字段 create_time 取当前时间，同时添加静态分区，分区字段为 etl_date，类型为
String，且值为当前比赛日的前一天日期（分区字段格式为 yyyyMMdd）。使用 hive cli
执行 show partitions ods.base_region 命令，将结果截图粘贴至客户端桌面【Release\任务
B 提交结果.docx】中对应的任务序号下；
 */
object t4 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("t4")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .enableHiveSupport()
      .getOrCreate()

    val conn = new Properties()
    conn.setProperty("user", "root")
    conn.setProperty("password", "123456")
    conn.setProperty("driver", "com.mysql.jdbc.Driver")

    val day = Calendar.getInstance()
    val current_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(day.getTime)
    day.add(Calendar.DATE, -1)
    val yesterday = new SimpleDateFormat("yyyyMMdd").format(day.getTime)

    val max_id = spark.table("ods.base_region")
      .agg(max(col("id")))
      .collect()(0).get(0).toString

    spark.read.jdbc("jdbc:mysql://192.168.40.110:3306/shtd_store?useSSL=false", "base_region", conn)
      .where(
        col("id") > lit(max_id).cast("int")
      )
      .withColumn("create_time", to_timestamp(lit(current_time)))
      .withColumn("etl_date", lit(yesterday))
      .write.mode("append")
      .partitionBy("etl_date")
      .saveAsTable("ods.base_region")


    spark.close()
  }

}
